/**
 * Copyright 2019 吉鼎科技.

 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.easyplatform.engine.runtime.batch;

import cn.easyplatform.*;
import cn.easyplatform.contexts.RecordContext;
import cn.easyplatform.contexts.WorkflowContext;
import cn.easyplatform.dao.BizDao;
import cn.easyplatform.dao.DaoException;
import cn.easyplatform.dao.utils.SqlUtils;
import cn.easyplatform.dos.FieldDo;
import cn.easyplatform.dos.Record;
import cn.easyplatform.engine.runtime.ComponentProcess;
import cn.easyplatform.entities.beans.LogicBean;
import cn.easyplatform.entities.beans.batch.BatchBean;
import cn.easyplatform.entities.beans.table.TableBean;
import cn.easyplatform.entities.helper.EventLogic;
import cn.easyplatform.i18n.I18N;
import cn.easyplatform.interceptor.CommandContext;
import cn.easyplatform.lang.Strings;
import cn.easyplatform.spi.listener.event.MessageEvent;
import cn.easyplatform.messages.vos.BatchVo;
import cn.easyplatform.messages.vos.executor.BeginVo;
import cn.easyplatform.messages.vos.executor.EndVo;
import cn.easyplatform.messages.vos.executor.ProgressVo;
import cn.easyplatform.support.scripting.CompliableScriptEngine;
import cn.easyplatform.support.scripting.ScriptEngineFactory;
import cn.easyplatform.support.sql.SqlParser;
import cn.easyplatform.type.DeviceType;
import cn.easyplatform.type.EntityType;
import cn.easyplatform.type.StateType;
import cn.easyplatform.util.RuntimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * @author <a href="mailto:davidchen@epclouds.com">littleDog</a> <br/>
 * @since 2.0.0 <br/>
 */
public class BatchProcess implements
        ComponentProcess<BatchBean, CommandContext, BatchVo> {

    private static final Logger log = LoggerFactory.getLogger(BatchProcess.class);

    private CommandContext cc;

    @Override
    public BatchVo doComponent(CommandContext cc, CommandContext vo,
                               BatchBean entity) {
        this.cc = cc;
        doBatch(entity);
        return null;
    }

    private void doBatch(BatchBean bb) {
        WorkflowContext ctx = cc.getWorkflowContext();
        ctx.setParameter("830", BatchBean.class.getSimpleName());
        ctx.setParameter("831", bb.getId()).setParameter("832", bb.getName());
        ctx.setParameter("833", bb.getTargetTable());
        go(bb);
        if (!Strings.isBlank(bb.getNextId())) {
            BatchBean bean = cc.getEntity(bb.getNextId());
            if (bean == null)
                throw new EntityNotFoundException(EntityType.BATCH.getName(),
                        bb.getNextId());
            doBatch(bean);
        } else
            cc.send(new MessageEvent(ctx.getId(), new EndVo()));
    }

    private void go(BatchBean bb) {
        CompliableScriptEngine recordLogicEngine = null;
        ResultSet rs = null;
        BizDao dao = null;
        EventLogic el = null;
        try {
            if (bb.getTransactionScope() == BatchBean.TRANSACTION_ALL)
                cc.beginTx();
            if(log.isDebugEnabled())
            log.debug("batch task->{}", bb.getName());
            WorkflowContext ctx = cc.getWorkflowContext();
            if (bb.getOnBegin() != null) {
                el = bb.getOnBegin();
                String code = RuntimeUtils.eval(cc, bb.getOnBegin(),
                        ctx.getRecord());
                if (!code.equals("0000"))
                    throw new ScriptRuntimeException(code, cc.getMessage(code,
                            ctx.getRecord()));
            }
            SqlParser<FieldDo> sp = RuntimeUtils.createSqlParser(FieldDo.class);
            String sql = sp.parse(bb.getSourceSql(), ctx.getRecord());
            dao = cc.getBizDao(bb.getSourceDsId());
            BeginVo bv = new BeginVo((I18N.getLabel("batch.execute.begin",
                    bb.getName())));
            if (cc.getEnv().getDeviceType() != DeviceType.JOB) {
                bv.setTotal(dao.getCount(sql, sp.getParams()));
                cc.send(new MessageEvent(ctx.getId(), bv));
            } else
                bv.setTotal(1);
            RecordContext rc = ctx.getRecord().clone();
            rc.setParameter("856", bv.getTotal());
            rc.setParameter("814", bb.getProcessCode());
            rc.removeParameter("833");
            rc.setParameter("815", true);
            if (bv.getTotal() > 0) {
                if (bb.getOnRecord() != null) {
                    String content = null;
                    el = bb.getOnRecord();
                    if (!Strings.isBlank(bb.getOnRecord().getId())) {
                        LogicBean lb = cc.getEntity(bb.getOnRecord().getId());
                        if (lb == null)
                            throw new EntityNotFoundException(
                                    EntityType.LOGIC.getName(), bb
                                    .getOnRecord().getId());
                        content = lb.getContent().trim();
                    } else if (!Strings.isBlank(bb.getOnRecord().getContent()))
                        content = bb.getOnRecord().getContent().trim();
                    if (!Strings.isBlank(content))
                        recordLogicEngine = ScriptEngineFactory
                                .createCompilableEngine(cc, content);
                }
                if(log.isDebugEnabled())
                log.debug("batch sql->{},params->{}", sql, sp.getParams());
                rs = dao.executeQuery(sql, sp.getParams());
                ResultSetMetaData rsmd = rs.getMetaData();
                int size = rsmd.getColumnCount();
                TableBean tb = null;
                int failedCount = 0;
                int rowIndex = 0;
                RecordContext tmp = null;
                while (rs.next()) {
                    if (cc.getUser() == null || cc.getUser().getState() == StateType.STOP)
                        return;
                    rowIndex++;
                    Record record = new Record();
                    for (int index = 1; index <= size; index++)
                        record.set(SqlUtils.getValue(rs, rsmd, index));
                    rc.setData(record);
                    try {
                        RecordContext target = null;
                        if (!Strings.isBlank(bb.getTargetTable())) {
                            if (tb == null) {
                                tb = cc.getEntity(bb.getTargetTable());
                                if (tb == null)
                                    throw new EntityNotFoundException(
                                            EntityType.TABLE.getName(),
                                            bb.getTargetTable());
                            }
                            boolean genKey = false;
                            if (tb.isAutoKey()
                                    && rc.getParameterAsChar("814") == 'C')
                                genKey = true;
                            target = ctx.createRecord(RuntimeUtils
                                    .createRecord(cc, tb, genKey));
                            target.setParameter("857", rowIndex);
                            target.setParameter("833", tb.getId());
                            ctx.getRecord().setData(target.getData());
                            tmp = target;
                        } else {
                            rc.setParameter("857", rowIndex);
                            ctx.getRecord().setData(record);
                            tmp = rc;
                        }
                        if (rowIndex % 10 == 0)
                            cc.send(new MessageEvent(ctx.getId(),
                                    new ProgressVo(rowIndex)));
                        if (bb.getTransactionScope() == BatchBean.TRANSACTION_RECORD)
                            cc.beginTx();
                        if (recordLogicEngine != null) {
                            if (target == null)
                                recordLogicEngine.eval(rc);
                            else
                                recordLogicEngine.eval(rc, target);
                        }
                        if (target != null) {
                            char code = target.getParameterAsChar("814");
                            if (bb.isBatchUpdate()) {//批量更新
                                if (code == 'C') {
                                    if (rowIndex == 1) {//初始
                                        StringBuilder sb = new StringBuilder();
                                        StringBuilder temp = new StringBuilder();
                                        sb.append("insert into ").append(tb.getId()).append(" (");
                                        Iterator<FieldDo> itr = target.getData().getData().iterator();
                                        while (itr.hasNext()) {
                                            FieldDo fd = itr.next();
                                            sb.append(fd.getRawName());
                                            temp.append("?");
                                            if (itr.hasNext()) {
                                                sb.append(",");
                                                temp.append(",");
                                            }
                                        }
                                        sb.append(") values (").append(temp.toString()).append(")");
                                        temp = null;
                                        dao.prepareBatch(sb.toString(), false);
                                        sb = null;
                                    }
                                    dao.addBatch(cc.getUser(), target.getData().getData());
                                } else if (code == 'U') {
                                    if (rowIndex == 1) {//初始
                                        StringBuilder sb = new StringBuilder();
                                        sb.append("update ").append(tb.getId()).append(" set ");
                                        Iterator<FieldDo> itr = target.getData().getData().iterator();
                                        while (itr.hasNext()) {
                                            FieldDo fd = itr.next();
                                            sb.append(fd.getRawName()).append("=?");
                                            if (itr.hasNext())
                                                sb.append(",");
                                        }
                                        sb.append(" where ");
                                        Iterator<String> keyItr = target.getData().getKey().iterator();
                                        while (keyItr.hasNext()) {
                                            sb.append(keyItr.next()).append("=?");
                                            if (keyItr.hasNext())
                                                sb.append(" and ");
                                        }
                                        dao.prepareBatch(sb.toString(), false);
                                        sb = null;
                                    }
                                    List<FieldDo> params = new ArrayList<FieldDo>(target.getData().getData());
                                    Iterator<String> keyItr = target.getData().getKey().iterator();
                                    while (keyItr.hasNext()) {
                                        FieldDo fd = target.getData().get(keyItr.next());
                                        params.add(fd);
                                    }
                                    dao.addBatch(cc.getUser(), params);
                                } else if (code == 'D') {
                                    if (rowIndex == 1) {
                                        StringBuilder sb = new StringBuilder();
                                        sb.append("delete from ").append(tb.getId()).append(" where ");
                                        Iterator<String> itr = tb.getKey().iterator();
                                        while (itr.hasNext()) {
                                            sb.append(itr.next()).append("=?");
                                            if (itr.hasNext())
                                                sb.append(" and ");
                                        }
                                        dao.prepareBatch(sb.toString(), false);
                                        sb = null;
                                    }
                                    dao.addBatch(cc.getUser(), RuntimeUtils.createPrimaryKey(tb,
                                            target.getData()));
                                }
                            } else if (code == 'U')
                                dao.update(cc.getUser(), tb.getId(),
                                        target.getData(), false);
                            else if (code == 'C') {
                                dao.insert(cc.getUser(), tb,
                                        target.getData(), false);
                            } else if (code == 'D')
                                dao.delete(cc.getUser(), tb.getId(),
                                        RuntimeUtils.createPrimaryKey(tb,
                                                target.getData()), false);
                        } else if (!Strings.isBlank(rc
                                .getParameterAsString("833"))) {
                            if (tb == null) {
                                String tid = rc.getParameterAsString("833");
                                tb = cc.getEntity(tid);
                                if (tb == null)
                                    throw new EntityNotFoundException(
                                            EntityType.TABLE.getName(), tid);
                            }
                            rc.getData().setKey(tb.getKey());
                            char code = rc.getParameterAsChar("814");
                            if (code == 'U')
                                dao.update(cc.getUser(), tb.getId(),
                                        rc.getData(), false);
                            else if (code == 'C') {
                                dao.insert(cc.getUser(), tb,
                                        rc.getData(), false);
                            } else if (code == 'D')
                                dao.delete(
                                        cc.getUser(),
                                        tb.getId(),
                                        RuntimeUtils.createPrimaryKey(tb,
                                                rc.getData()), false);
                        }
                        if (bb.getTransactionScope() == BatchBean.TRANSACTION_RECORD)
                            cc.commitTx();
                    } catch (Exception ex) {
                        failedCount++;
                        if (tmp != null)
                            tmp.setParameter("858", failedCount);
                        if (ex instanceof ScriptEvalExitException) {
                            if (!tmp.getParameterAsBoolean("855"))// 如果855不为真，则直接退出
                                throw new ScriptRuntimeException(
                                        ex.getMessage(), cc.getMessage(
                                        ex.getMessage(), tmp));
                        } else if (bb.getOnException() != null) {
                            String code = RuntimeUtils.eval(cc,
                                    bb.getOnException(), tmp);
                            if (!code.equals("0000"))
                                throw new ScriptRuntimeException(code,
                                        cc.getMessage(code, tmp));
                            if (!tmp.getParameterAsBoolean("855"))
                                throw ex;
                        } else {
                            if (ex instanceof ScriptEvalException)
                                ((ScriptEvalException) ex).setSource(el
                                        .toString());
                            else if (ex instanceof EasyPlatformWithLabelKeyException)
                                ((EasyPlatformWithLabelKeyException) ex)
                                        .setSource(el.toString());
                            throw ex;
                        }
                    }
                }
                if (bb.isBatchUpdate())
                    dao.executeBatch();
            }
            if (bb.getOnEnd() != null) {
                String code = RuntimeUtils.eval(cc, bb.getOnEnd(), rc);
                if (!code.equals("0000"))
                    throw new ScriptRuntimeException(code, cc.getMessage(code,
                            rc));
            }
            if (bb.getTransactionScope() == BatchBean.TRANSACTION_ALL)
                cc.commitTx();
        } catch (Exception ex) {
            if (bb.getTransactionScope() != BatchBean.TRANSACTION_CONTAINER)
                cc.rollbackTx();
            if (ex instanceof SQLException)
                throw new EasyPlatformWithLabelKeyException(
                        "dao.biz.query.error", ex, bb.getSourceSql());
            if (ex instanceof EasyPlatformRuntimeException)
                throw (EasyPlatformRuntimeException) ex;
            if (ex instanceof ScriptRuntimeException)
                throw (ScriptRuntimeException) ex;
            throw new DaoException("dao.access.transaction.perform", ex,
                    bb.getName());
        } finally {
            cc.closeTx();
            if (rs != null)
                dao.close(rs);
            if (recordLogicEngine != null)
                recordLogicEngine.destroy();
        }
    }
}
